import visdom
import numpy as np
import sys
sys.path.append("../TimeSeriesTools")
import utils
from cerberus import Validator
import chart_studio.plotly as py
import plotly.express as px
import plotly.tools as tls
from plotly.subplots import make_subplots
import plotly.graph_objects as go
import pandas as pd
from datetime import datetime
import mongodb_utils
db_host= 'localhost'
port = '28018'
db_name='TimeSeriesBench'
mongodb_client = mongodb_utils.mongodb_connect(db_host, port)
import kairosdb_utils
global kairosdb_server
kairosdb_server = "http://localhost:6060"
import influxdb_utils
db_host= 'localhost'
port = '9086'
db_name='TimeSeriesBench'
influxdb_client = influxdb_utils.influxdb_connect(db_host, port)
import warp10_utils
global warp10_server
warp10_server = "http://localhost:7070"
Cette fonction est utilisée pour lire tous les documents dans la base de mongodb, mais il reste un problème : mongodb n'est pas une base de données spécialement conçue pour les données de séries temporaires, donc c'est possible d'exister certains données dédoublées avec même timestamps. Soit on définit un règle avant le stockage de mongodb pour supprimer ou modifier les données dédboulées, ou soit on le néglige.
def get_collection_scheme(db_name,scheme_name):
db = mongodb_client[db_name]
schemes_coll = db['schemes']
scheme = schemes_coll.find({"name":scheme_name})
return scheme
def mongodb_find_all_data(db_name,coll_name,scheme):
data = mongodb_utils.get_all_data(mongodb_client,db_name,coll_name,scheme)
return data
def mongodb_find_data_select_by_tags(db_name,coll_name,tags,scheme):
data = mongodb_utils.get_data_select_by_tags(mongodb_client,db_name,coll_name,tags,scheme)
return data
def kairosdb_find_all_data(db_name,coll_name,scheme):
data = kairosdb_utils.get_all_data(kairosdb_server,db_name,coll_name,scheme)
return data
def kairosdb_find_data_select_by_tags(db_name,coll_name,tags,scheme):
data = kairosdb_utils.get_data_select_by_tags(kairosdb_server,db_name,coll_name,tags,scheme)
return data
def influxdb_find_all_data(db_name,coll_name,scheme):
data = influxdb_utils.get_all_data(influxdb_client,db_name,coll_name,scheme)
return data
def influxdb_find_data_select_by_tags(db_name,coll_name,tags,scheme):
data = influxdb_utils.get_data_select_by_tags(influxdb_client,db_name,coll_name,tags,scheme)
return data
def warp10_find_data_select_by_tags(db_name,coll_name,tags,scheme):
import json
res = warp10_utils.get_data_select_by_tags(warp10_server,db_name,coll_name,tags,scheme)
nb_docs = 0
data_list = []
for r in res :
for d in r:
nb_docs += len(d['v'])
for v in d['v']:
data_list.append([v[0], json.loads(v[4])])
tagname = 'TAG'
for i,(k,v) in enumerate(tags.items()):
tagname = tagname+'.'+v
cols = [ k for k in scheme.keys()]
results = []
for d in data_list:
data = d[0]
str_value = str(id)+';'+str(d[0])+';'+tagname+';'+str(d[1][0])+';'+str(d[1][1])
values = str_value.split(';')
results.append({ cols[i]:values[i] for i in range(len(cols))})
return results
Les objects de datetime ne peuvent pas directemant utilisés pour tracer l'axis X, donc il faut d'abord convertir les datetime à unix timestamp.
def to_unix_time(dt):
epoch = datetime.utcfromtimestamp(7200)
return (dt - epoch).total_seconds() * 1000
def str_to_unix(date):
dt = datetime.strptime(date, '%d/%m/%Y %H:%M:%S')
epoch = datetime.utcfromtimestamp(0)
return int((dt - epoch).total_seconds()) * 1000
Normalement cette étape est faite avant le stockage des données, vu que on ne sait pas encore les règles détaillés pour la validation les données, on insère quand même les données corruptibles dans la base.
Dans cet exemple, la source des données contient une ligne de données qui manque trois champs pour tester la validation des données.
Donc avant visualiser les courbes, on trouve d'abord les données corruptibles et les supprime.
Les règles de validation temporaires sont :
bson.objectid.ObjectIddef clean_data(scheme,data):
from cerberus import Validator
v = Validator(scheme)
for index,item in enumerate(data,start=0):
res = v.validate(item)
if (res == False):
print("corrupt data in line :",index,", error : ",v.errors)
del data[index]
scheme = get_collection_scheme(db_name,'SmartGrid')
scheme[0]
%%time
coll_name='SmartGridCryolite20190101OneMonthV10000'
data = mongodb_find_all_data(db_name,coll_name,scheme[0]['value'])
print("number of docs",len(data))
%%time
coll_name='SmartGridCryolite20190101OneMonthV10000'
tags = { 'Buiding' : 'CRY', 'Device' : 'CENTRALE_SOLAIRE', 'Measure' : 'CRY_act_prod_pow' }
data = mongodb_find_data_select_by_tags(db_name,coll_name,tags,scheme[0]['value'])
print("number of docs",len(data))
%%time
coll_name='SmartGridCryolite20190101OneMonthV10000'
tags = { 'Buiding' : 'CRY', 'Device' : 'CENTRALE_SOLAIRE', 'Measure' : 'CRY_act_prod_pow' }
data = kairosdb_find_data_select_by_tags(db_name,coll_name,tags,scheme[0]['value'])
print("number of docs",len(data))
%%time
coll_name='SmartGridCryolite20190101OneMonthV10000'
tags = { 'Buiding' : 'CRY', 'Device' : 'CENTRALE_SOLAIRE', 'Measure' : 'CRY_act_prod_pow' }
data = influxdb_find_data_select_by_tags(db_name,coll_name,tags,scheme[0]['value'])
print("number of docs",len(data))
%%time
coll_name='SmartGridCryolite20190101OneMonthV10000'
tags = { 'Buiding' : 'CRY', 'Device' : 'CENTRALE_SOLAIRE', 'Measure' : 'CRY_act_prod_pow' }
data = warp10_find_data_select_by_tags(db_name,coll_name,tags,scheme[0]['value'])
print("number of docs",len(data))
Les informations des données corruptibles seront affichées dans la console :
%%time
clean_data(scheme[0]['value'],data)
%%time
df = pd.DataFrame(data)
df[0:5]
df['value']
df.tagname.unique()
df2 = df.loc[df['tagname'] == 'CRY.CENTRALE_SOLAIRE.CRY_act_prod_pow']
df2[0:10]
axisX = df['timestamp']
# convertir cas MongoDB
df['timestamp'] = df['timestamp'].apply(str_to_unix)
plotly¶Pour éviter les données dédoublées, on recupère les premières 5000 lignes comme la source de données du plot.
Dans cet plot les courbes partagent les axis-X et axis-Y, donc on peut observer que les tendances des courbes ne sont pas très claire parce que les plages de valeur des colonnes de données sont très variées.
# Cas MongoDB
x = df2['timestamp'].values
nb_pts = len(x)
nb_pts
y_value=[float(item) for item in df2["value"]]
y_quality=[float(item)/100 for item in df2["quality"]]
x = df['timestamp'].values
nb_pts = len(x)
nb_pts
y_value=[float(item) for item in df["value"]]
y_quality=[float(item)/100 for item in df["quality"]]
%matplotlib inline
nb_pts=25000
fig = go.Figure()
fig.add_trace(go.Scatter(
x=axisX[0:nb_pts-1],
y=y_value[0:nb_pts-1],
name="value",
line_color='deepskyblue',
opacity=0.8))
fig.add_trace(go.Scatter(
x=axisX[0:nb_pts-1],
y=y_quality[0:nb_pts-1],
name="quality",
line_color='dimgray',
opacity=0.8))
# Use date string to set xaxis range
fig.update_layout(xaxis_range=[x[0],
x[nb_pts-1]],
title_text="smartgrid data series")
fig.show()
Cet plot contient trois subplots, dans la figure, les courbes partagent l'axis-X mais elles possèdent différents axis-Y.
On remarque que les tendances des courbes sont plus évidentes que la figure précédente.
%matplotlib inline
nb_pts=25000
fig = make_subplots(
rows=2, cols=1, shared_xaxes=True, vertical_spacing=0.02
)
fig.add_trace(go.Scatter(
x=axisX[0:nb_pts-1],
y=y_value[0:nb_pts-1],
name="value",
line_color='deepskyblue',
line_width = 2,
opacity=0.8),
row=1, col=1)
fig.add_trace(go.Scatter(
x=axisX[0:nb_pts-1],
y=y_quality[0:nb_pts-1],
name="quality",
line_color='dimgray',
opacity=0.8),
row=2, col=1)
fig.update_layout(height=1000, width=1000,
title_text="smartGrid data series")
fig.show()
Avant cette étape, il faut lancer le service de visdom dans le terminal.
> visdom
L'address du web UI de visdom sera affichée dans la console.
vis = visdom.Visdom()
vis.plotlyplot(fig, win="mywin3")
vis.update_window_opts(win = "mywin3", opts=dict(width=1200, height=1500))
Après l'exécution de ce script, ouvrir le web UI de visdom, le plot est présenté dans une fenêtre intéractive.